feat(datafusion): add catalog-level temporary table support#309
feat(datafusion): add catalog-level temporary table support#309JingsongLi merged 2 commits intoapache:mainfrom
Conversation
ceacfed to
75c2fec
Compare
jerry-024
left a comment
There was a problem hiding this comment.
Thanks for the comprehensive work on catalog-scoped temp tables! The core design is solid — scoping temp tables per-catalog, the TempTableTracker RAII pattern, and migrating all tests from datafusion.public to the Paimon catalog path are all good improvements.
Below are inline comments on the issues I found, prioritized by severity.
P2 — Suggest fixing (non-blocking):
-
looks_like_create_tablealso matchesCREATE TEMP VIEW(sql_context.rs:1273): The TEMPORARY/TEMP skip doesn't distinguish TABLE from VIEW. If aCREATE TEMPORARY VIEWever contains Paimon-specific syntax likePARTITIONED BY, the pre-parser would incorrectly try to extract partition keys. Currently not triggered but is a latent bug. -
bindings/python/README.md— Usage section removed without replacement: Users reading the README on GitHub/PyPI lose the quick-start guide. Consider adding a link toproject-description.mdor restoring a minimal example. -
Python
register_batchlacks negative test: No test for callingregister_batchwith an invalid/unknown catalog name.
| /// | ||
| /// Returns the table name, version/tag value, and byte range of the full clause. | ||
| fn extract_version_as_of(sql: &str) -> Option<VersionAsOfInfo> { | ||
| let lower = sql.to_lowercase(); |
There was a problem hiding this comment.
P0 — Must fix (Java Paimon compatibility)
lower.find("version as of ") is a naive string search that will fail on valid SQL patterns that work in Java Paimon (Flink/Spark):
- JOIN two time-travel tables:
SELECT * FROM t1 VERSION AS OF 1 JOIN t2 VERSION AS OF 2 ON ...— only the first match is found; the second is silently ignored. - Subquery / CTE:
WITH cte AS (SELECT * FROM t VERSION AS OF 1) SELECT * FROM cte—clause_rangereplacement corrupts the SQL structure. - String literal false positive:
WHERE note = 'version as of 1'— matches inside a string constant. - Table alias after clause:
FROM t VERSION AS OF 1 AS tt— replacement range doesn't account for the alias.
In Java Paimon, all of these are handled correctly by the SQL parser. Users migrating SQL from Flink/Spark will hit unexpected failures.
Suggestion: Consider a two-pass approach — first extract and strip time-travel clauses with awareness of quoted strings and parenthesis nesting, then hand the cleaned SQL to Parser::parse_sql. At minimum, add a quoted-string skip to avoid false positives inside string literals.
| ); | ||
|
|
||
| // Register the provider under the UUID temp table name | ||
| self.register_temp_table(uuid_name.as_str(), provider)?; |
There was a problem hiding this comment.
P1 — Temp table leak on panic
If self.ctx.sql() panics, this temp table is never cleaned up. This PR already introduces TempTableTracker as an RAII guard for exactly this pattern (used in execute_cow_delete_once, execute_cow_merge_once, etc.), but it's not used here.
Suggested fix:
let mut tracker = TempTableTracker::new(self);
self.register_temp_table(uuid_name.as_str(), provider)?;
tracker.register(&uuid_name);
let result = self.ctx.sql(&rewritten_sql).await;
// tracker auto-deregisters on drop
result| - `"database.my_table"` — uses the current catalog with the specified database | ||
| - `"catalog.database.my_table"` — fully qualified | ||
|
|
||
| ### register_mem_table |
There was a problem hiding this comment.
P1 — Non-existent API
register_mem_table is documented here but doesn't exist in the code — the PR implements register_temp_table instead. Either add register_mem_table as a convenience wrapper, or update the docs to reference register_temp_table.
| ) -> DFResult<()> { | ||
| // Check if a temp table with this name already exists | ||
| { | ||
| let databases = self.temp_tables.read().unwrap_or_else(|e| e.into_inner()); |
There was a problem hiding this comment.
P1 — TOCTOU between existence check and registration
This read-lock check and the actual registration via get_or_create_temp_database (line 254) are done in separate lock acquisitions. Between them, there's a blocking block_on_with_runtime call (shadow warning, lines 235-252) where another thread could register a table with the same name.
Since register_temp_table is a public API, consider holding the write lock for the entire check-then-register operation, or using MemorySchemaProvider::register_table's return value to detect the conflict atomically.
| let (catalog, _catalog_name, identifier) = self.resolve_table_name_from_ref(&table_ref)?; | ||
|
|
||
| let paimon_table = catalog | ||
| .get_table(&identifier) |
There was a problem hiding this comment.
P0 — Must fix (dynamic options bypass)
catalog.get_table(&identifier) here loads the table directly from the Paimon catalog, bypassing the normal PaimonSchemaProvider path. This means session-scoped dynamic options set via SET 'paimon.xxx' = ... are not applied to time-travel queries.
For example:
SET 'paimon.blob-as-descriptor' = 'true';
SELECT * FROM paimon.default.my_table VERSION AS OF 1; -- blob-as-descriptor NOT appliedThe normal table loading path in PaimonSchemaProvider::table() applies dynamic_options before creating the provider. Time-travel should do the same — either pass self.dynamic_options to copy_with_options alongside the time-travel options, or go through the schema provider.
| let statements = Parser::parse_sql(&dialect, &rewritten_sql) | ||
| let sql_lower = rewritten_sql.to_lowercase(); | ||
| let has_time_travel = | ||
| sql_lower.contains("version as of") || sql_lower.contains("timestamp as of"); |
There was a problem hiding this comment.
P0 — Must fix (false positive on string literals)
This contains() check is the entry gate that short-circuits all subsequent SQL parsing. Any SQL containing the substring version as of or timestamp as of — even inside a string literal or comment — will skip Parser::parse_sql entirely and enter the regex-based time-travel path.
Examples that would be misrouted:
SELECT * FROM t WHERE note = 'version as of 1'
-- comment: version as of something
INSERT INTO t VALUES ('timestamp as of now')This needs at minimum a quoted-string-aware scan before deciding to treat the query as time-travel. One approach: skip over single-quoted strings ('...') and SQL comments (-- / /* */) before checking for the keyword.
| self.ctx.sql(sql).await | ||
| } | ||
| Statement::Truncate(truncate) => self.handle_truncate_table(truncate).await, | ||
| Statement::CreateView(create_view) => self.handle_create_view(create_view).await, |
There was a problem hiding this comment.
P0 — Must fix (breaks non-Paimon catalogs)
This intercepts all CREATE VIEW statements, and the Statement::Drop arm below (line 385) intercepts all DROP TABLE / DROP VIEW. Before this PR, these would fall through to self.ctx.sql(sql) and be handled by DataFusion normally.
Now:
CREATE VIEW datafusion.public.my_view AS ...→ errors with "CREATE VIEW (non-temporary) is not supported" (line 1113), even though DataFusion can handle it.DROP TABLE datafusion.public.x→resolve_catalog_and_tablelooks up only Paimon catalogs and fails with "Unknown catalog 'datafusion'".
Suggestion: Only intercept these statements when the resolved catalog is a registered Paimon catalog. If the catalog name doesn't exist in self.catalogs, fall through to self.ctx.sql(sql).await for DataFusion's default handling.
|
Thanks @jerry-024 for the review, addressed comments. |
Purpose
This PR introduces catalog-level temporary table and view support for the DataFusion integration. Previously, temporary tables were not scoped to a specific Paimon catalog. This change allows users to create, query, and drop temporary tables and views within any registered Paimon catalog using standard SQL syntax.
Also add Python API:
Brief change log
Tests
API and Format
Documentation